版本管理概述#
Skills 的版本管理是维护和演进 Skills 的重要机制。本节将详细介绍 Skills 的版本控制、版本发布和版本迁移等内容。
版本控制#
1. 版本号规范#
1.1 语义化版本
语义化版本(SemVer)#
版本格式#
主版本号.次版本号.修订号(MAJOR.MINOR.PATCH)
版本规则#
- 主版本号(MAJOR):不兼容的 API 修改
- 次版本号(MINOR):向下兼容的功能性新增
- 修订号(PATCH):向下兼容的问题修正
示例#
- 1.0.0 → 1.1.0:新增功能(向下兼容)
- 1.1.0 → 1.1.1:修复 bug(向下兼容)
- 1.1.1 → 2.0.0:不兼容的 API 变更
1.2 预发布版本
格式#
主版本号.次版本号.修订号-预发布标识.预发布版本号
预发布标识#
- alpha:内部测试版
- beta:公开测试版
- rc:候选发布版
示例#
- 1.0.0-alpha.1:第一个 alpha 版本
- 1.0.0-beta.1:第一个 beta 版本
- 1.0.0-rc.1:第一个候选版本
1.3 构建元数据
格式#
主版本号.次版本号.修订号+构建元数据
构建元数据#
- 构建日期
- 构建号
- Git 提交哈希
示例#
- 1.0.0+20240115
- 1.0.0+build.123
- 1.0.0+abc123def456
2. 版本比较#
2.1 版本比较算法
pythonclass VersionComparator: def compare(self, v1, v2): v1_parts = self.parse_version(v1) v2_parts = self.parse_version(v2) # 比较主版本号 if v1_parts["major"] != v2_parts["major"]: return -1 if v1_parts["major"] < v2_parts["major"] else 1 # 比较次版本号 if v1_parts["minor"] != v2_parts["minor"]: return -1 if v1_parts["minor"] < v2_parts["minor"] else 1 # 比较修订号 if v1_parts["patch"] != v2_parts["patch"]: return -1 if v1_parts["patch"] < v2_parts["patch"] else 1 # 比较预发布版本 if v1_parts["prerelease"] and v2_parts["prerelease"]: return self.compare_prerelease(v1_parts["prerelease"], v2_parts["prerelease"]) elif v1_parts["prerelease"]: return -1 elif v2_parts["prerelease"]: return 1 return 0 def parse_version(self, version): # 解析版本号 parts = version.split("-", 1) version_part = parts[0] prerelease_part = parts[1] if len(parts) > 1 else None version_numbers = version_part.split(".") major = int(version_numbers[0]) if len(version_numbers) > 0 else 0 minor = int(version_numbers[1]) if len(version_numbers) > 1 else 0 patch = int(version_numbers[2]) if len(version_numbers) > 2 else 0 # 解析构建元数据 build_metadata = None if "+" in version_part: version_part, build_metadata = version_part.split("+", 1) return { "major": major, "minor": minor, "patch": patch, "prerelease": prerelease_part, "build": build_metadata } def compare_prerelease(self, p1, p2): # 比较预发布版本 p1_parts = p1.split(".") p2_parts = p2.split(".") for i in range(max(len(p1_parts), len(p2_parts))): if i >= len(p1_parts): return -1 if i >= len(p2_parts): return 1 part1 = p1_parts[i] part2 = p2_parts[i] # 数字比较 if part1.isdigit() and part2.isdigit(): num1 = int(part1) num2 = int(part2) if num1 != num2: return -1 if num1 < num2 else 1 else: # 字符串比较 if part1 != part2: return -1 if part1 < part2 else 1 return 0
2.2 版本范围
pythonclass VersionRange: def __init__(self, range_spec): self.range_spec = range_spec self.comparator = VersionComparator() def satisfies(self, version): constraints = self.parse_range(self.range_spec) for constraint in constraints: if not self.check_constraint(version, constraint): return False return True def parse_range(self, range_spec): constraints = [] if not any(c in range_spec for c in ['^', '~', '>=', '<=', '>', '<', '||']): constraints.append({ "operator": "==", "version": range_spec }) return constraints parts = range_spec.split("||") for part in parts: part = part.strip() if part.startswith("^"): version = part[1:] constraints.append({ "operator": "^", "version": version }) elif part.startswith("~"): version = part[1:] constraints.append({ "operator": "~", "version": version }) elif part.startswith(">="): version = part[2:] constraints.append({ "operator": ">=", "version": version }) elif part.startswith("<="): version = part[2:] constraints.append({ "operator": "<=", "version": version }) elif part.startswith(">"): version = part[1:] constraints.append({ "operator": ">", "version": version }) elif part.startswith("<"): version = part[1:] constraints.append({ "operator": "<", "version": version }) return constraints def check_constraint(self, version, constraint): operator = constraint["operator"] constraint_version = constraint["version"] if operator == "==": return self.comparator.compare(version, constraint_version) == 0 elif operator == ">=": return self.comparator.compare(version, constraint_version) >= 0 elif operator == "<=": return self.comparator.compare(version, constraint_version) <= 0 elif operator == ">": return self.comparator.compare(version, constraint_version) > 0 elif operator == "<": return self.comparator.compare(version, constraint_version) < 0 elif operator == "^": return self.check_caret(version, constraint_version) elif operator == "~": return self.check_tilde(version, constraint_version) return False def check_caret(self, version, constraint): constraint_parts = self.comparator.parse_version(constraint) min_version = constraint max_version = f"{constraint_parts['major'] + 1}.0.0" return (self.comparator.compare(version, min_version) >= 0 and self.comparator.compare(version, max_version) < 0) def check_tilde(self, version, constraint): constraint_parts = self.comparator.parse_version(constraint) min_version = constraint max_version = f"{constraint_parts['major']}.{constraint_parts['minor'] + 1}.0" return (self.comparator.compare(version, min_version) >= 0 and self.comparator.compare(version, max_version) < 0)
3. 版本存储#
3.1 版本仓库
pythonclass VersionRepository: def __init__(self, storage_path): self.storage_path = storage_path self.comparator = VersionComparator() def save_version(self, skill_id, version, skill_data): version_path = self.get_version_path(skill_id, version) os.makedirs(os.path.dirname(version_path), exist_ok=True) with open(version_path, 'w') as f: json.dump(skill_data, f, indent=2) def load_version(self, skill_id, version): version_path = self.get_version_path(skill_id, version) if not os.path.exists(version_path): raise VersionNotFoundError(version) with open(version_path, 'r') as f: return json.load(f) def list_versions(self, skill_id): skill_path = self.get_skill_path(skill_id) if not os.path.exists(skill_path): return [] versions = [] for item in os.listdir(skill_path): if os.path.isdir(os.path.join(skill_path, item)): versions.append(item) versions.sort(key=lambda v: self.comparator.parse_version(v)) return versions def get_latest_version(self, skill_id): versions = self.list_versions(skill_id) if not versions: return None return versions[-1] def get_version_path(self, skill_id, version): return os.path.join(self.storage_path, skill_id, version, "skill.json") def get_skill_path(self, skill_id): return os.path.join(self.storage_path, skill_id)
3.2 版本索引
pythonclass VersionIndex: def __init__(self, index_path): self.index_path = index_path self.index = self.load_index() def load_index(self): if os.path.exists(self.index_path): with open(self.index_path, 'r') as f: return json.load(f) return {} def save_index(self): with open(self.index_path, 'w') as f: json.dump(self.index, f, indent=2) def add_version(self, skill_id, version, metadata): if skill_id not in self.index: self.index[skill_id] = {} self.index[skill_id][version] = metadata self.save_index() def get_version_metadata(self, skill_id, version): return self.index.get(skill_id, {}).get(version) def list_versions(self, skill_id): versions = self.index.get(skill_id, {}) return list(versions.keys()) def get_latest_version(self, skill_id): versions = self.list_versions(skill_id) if not versions: return None comparator = VersionComparator() return max(versions, key=lambda v: comparator.parse_version(v)) def search_versions(self, skill_id, version_range): range_checker = VersionRange(version_range) versions = self.list_versions(skill_id) return [v for v in versions if range_checker.satisfies(v)]
版本发布#
1. 发布流程#
1.1 发布准备
pythonclass ReleasePreparer: def __init__(self): self.version_repository = VersionRepository("./skills") self.version_index = VersionIndex("./index.json") def prepare_release(self, skill_id, version, release_notes): if not self.validate_version(version): raise InvalidVersionError(version) if self.version_exists(skill_id, version): raise VersionAlreadyExistsError(version) skill_data = self.load_skill_data(skill_id) metadata = { "version": version, "release_date": datetime.now().isoformat(), "release_notes": release_notes, "checksum": self.calculate_checksum(skill_data) } return metadata def validate_version(self, version): pattern = r'^\d+\.\d+\.\d+(-[a-zA-Z0-9]+(\.\d+)?)?(\+[a-zA-Z0-9]+)?$' return re.match(pattern, version) is not None def version_exists(self, skill_id, version): versions = self.version_repository.list_versions(skill_id) return version in versions def load_skill_data(self, skill_id): pass def calculate_checksum(self, skill_data): data_str = json.dumps(skill_data, sort_keys=True) return hashlib.sha256(data_str.encode()).hexdigest()
1.2 发布执行
pythonclass ReleasePublisher: def __init__(self): self.version_repository = VersionRepository("./skills") self.version_index = VersionIndex("./index.json") def publish_release(self, skill_id, version, skill_data, metadata): self.version_repository.save_version(skill_id, version, skill_data) self.version_index.add_version(skill_id, version, metadata) self.create_tag(skill_id, version) self.notify_subscribers(skill_id, version, metadata) def create_tag(self, skill_id, version): tag_name = f"{skill_id}/v{version}" tag_message = f"Release {skill_id} version {version}" subprocess.run([ "git", "tag", "-a", tag_name, "-m", tag_message ], check=True) def notify_subscribers(self, skill_id, version, metadata): subscribers = self.get_subscribers(skill_id) for subscriber in subscribers: self.send_notification(subscriber, skill_id, version, metadata) def get_subscribers(self, skill_id): pass def send_notification(self, subscriber, skill_id, version, metadata): pass
2. 变更日志#
2.1 变更记录
pythonclass ChangeLogManager: def __init__(self): self.change_logs = {} def add_change(self, skill_id, version, change_type, description): if skill_id not in self.change_logs: self.change_logs[skill_id] = {} if version not in self.change_logs[skill_id]: self.change_logs[skill_id][version] = { "version": version, "date": datetime.now().isoformat(), "changes": { "added": [], "changed": [], "deprecated": [], "removed": [], "fixed": [], "security": [] } } self.change_logs[skill_id][version]["changes"][change_type].append(description) def get_change_log(self, skill_id, version): return self.change_logs.get(skill_id, {}).get(version) def get_all_change_logs(self, skill_id): return self.change_logs.get(skill_id, {}) def generate_release_notes(self, skill_id, version): change_log = self.get_change_log(skill_id, version) if not change_log: return "No changes recorded." notes = [f"# Release {version}"] notes.append(f"Date: {change_log['date']}") notes.append("") changes = change_log["changes"] if changes["added"]: notes.append("## Added") for change in changes["added"]: notes.append(f"- {change}") notes.append("") if changes["changed"]: notes.append("## Changed") for change in changes["changed"]: notes.append(f"- {change}") notes.append("") if changes["deprecated"]: notes.append("## Deprecated") for change in changes["deprecated"]: notes.append(f"- {change}") notes.append("") if changes["removed"]: notes.append("## Removed") for change in changes["removed"]: notes.append(f"- {change}") notes.append("") if changes["fixed"]: notes.append("## Fixed") for change in changes["fixed"]: notes.append(f"- {change}") notes.append("") if changes["security"]: notes.append("## Security") for change in changes["security"]: notes.append(f"- {change}") notes.append("") return "\n".join(notes)
2.2 变更对比
pythonclass ChangeComparator: def __init__(self): self.version_repository = VersionRepository("./skills") def compare_versions(self, skill_id, from_version, to_version): from_data = self.version_repository.load_version(skill_id, from_version) to_data = self.version_repository.load_version(skill_id, to_version) changes = { "added": [], "removed": [], "modified": [] } changes["added"].extend(self.find_added_parameters(from_data, to_data)) changes["removed"].extend(self.find_removed_parameters(from_data, to_data)) changes["modified"].extend(self.find_modified_parameters(from_data, to_data)) changes["added"].extend(self.find_added_features(from_data, to_data)) changes["removed"].extend(self.find_removed_features(from_data, to_data)) changes["modified"].extend(self.find_modified_features(from_data, to_data)) return changes def find_added_parameters(self, from_data, to_data): from_params = set(from_data.get("parameters", {}).keys()) to_params = set(to_data.get("parameters", {}).keys()) return list(to_params - from_params) def find_removed_parameters(self, from_data, to_data): from_params = set(from_data.get("parameters", {}).keys()) to_params = set(to_data.get("parameters", {}).keys()) return list(from_params - to_params) def find_modified_parameters(self, from_data, to_data): from_params = from_data.get("parameters", {}) to_params = to_data.get("parameters", {}) modified = [] for param in from_params: if param in to_params and from_params[param] != to_params[param]: modified.append(param) return modified def find_added_features(self, from_data, to_data): from_features = set(from_data.get("features", [])) to_features = set(to_data.get("features", [])) return list(to_features - from_features) def find_removed_features(self, from_data, to_data): from_features = set(from_data.get("features", [])) to_features = set(to_data.get("features", [])) return list(from_features - to_features) def find_modified_features(self, from_data, to_data): pass
版本迁移#
版本迁移#
1. 迁移脚本#
1.1 迁移定义
pythonclass Migration: def __init__(self, from_version, to_version, migrate_func): self.from_version = from_version self.to_version = to_version self.migrate_func = migrate_func def can_migrate(self, from_version, to_version): comparator = VersionComparator() return (comparator.compare(from_version, self.from_version) >= 0 and comparator.compare(to_version, self.to_version) <= 0) def migrate(self, data): return self.migrate_func(data)
1.2 迁移注册
pythonclass MigrationRegistry: def __init__(self): self.migrations = [] def register_migration(self, migration): self.migrations.append(migration) def get_migrations(self, from_version, to_version): applicable_migrations = [] for migration in self.migrations: if migration.can_migrate(from_version, to_version): applicable_migrations.append(migration) comparator = VersionComparator() applicable_migrations.sort(key=lambda m: comparator.parse_version(m.from_version)) return applicable_migrations
2. 迁移执行#
2.1 数据迁移
pythonclass DataMigrator: def __init__(self, migration_registry): self.migration_registry = migration_registry def migrate_data(self, data, from_version, to_version): migrations = self.migration_registry.get_migrations(from_version, to_version) current_data = data current_version = from_version for migration in migrations: current_data = migration.migrate(current_data) current_version = migration.to_version return current_data def migrate_skill(self, skill_id, from_version, to_version): version_repository = VersionRepository("./skills") data = version_repository.load_version(skill_id, from_version) migrated_data = self.migrate_data(data, from_version, to_version) version_repository.save_version(skill_id, to_version, migrated_data) return migrated_data
2.2 配置迁移
pythonclass ConfigMigrator: def __init__(self, migration_registry): self.migration_registry = migration_registry def migrate_config(self, config, from_version, to_version): migrations = self.migration_registry.get_migrations(from_version, to_version) current_config = config current_version = from_version for migration in migrations: current_config = migration.migrate(current_config) current_version = migration.to_version return current_config def migrate_user_config(self, user_id, from_version, to_version): config = self.load_user_config(user_id) migrated_config = self.migrate_config(config, from_version, to_version) self.save_user_config(user_id, migrated_config) return migrated_config def load_user_config(self, user_id): pass def save_user_config(self, user_id, config): pass
版本回滚#
1. 回滚准备#
bashpython class RollbackPreparer: def __init__(self): self.version_repository = VersionRepository("./skills") self.backup_repository = BackupRepository("./backups") def prepare_rollback(self, skill_id, from_version, to_version): # 验证版本 if not self.version_exists(skill_id, to_version): raise VersionNotFoundError(to_version) # 创建备份 backup_id = self.create_backup(skill_id, from_version) # 准备回滚 rollback_plan = { "skill_id": skill_id, "from_version": from_version, "to_version": to_version, "backup_id": backup_id, "steps": self.generate_rollback_steps(skill_id, from_version, to_version) } return rollback_plan def version_exists(self, skill_id, version): versions = self.version_repository.list_versions(skill_id) return version in versions def create_backup(self, skill_id, version): # 创建备份 backup_id = self.backup_repository.create_backup(skill_id, version) return backup_id def generate_rollback_steps(self, skill_id, from_version, to_version): # 生成回滚步骤 steps = [ { "step": 1, "action": "stop_skill", "description": "停止 Skill" }, { "step": 2, "action": "restore_version", "description": f"恢复到版本 {to_version}" }, { "step": 3, "action": "migrate_config", "description": "迁移配置" }, { "step": 4, "action": "start_skill", "description": "启动 Skill" }, { "step": 5, "action": "verify", "description": "验证回滚" } ] return steps ### 2. 回滚执行 ```python class RollbackExecutor: def __init__(self): self.version_repository = VersionRepository("./skills") self.backup_repository = BackupRepository("./backups") self.config_migrator = ConfigMigrator(MigrationRegistry()) def execute_rollback(self, rollback_plan): try: for step in rollback_plan["steps"]: self.execute_step(rollback_plan, step) return True except Exception as e: self.restore_backup(rollback_plan["backup_id"]) raise e def execute_step(self, rollback_plan, step): action = step["action"] if action == "stop_skill": self.stop_skill(rollback_plan["skill_id"]) elif action == "restore_version": self.restore_version( rollback_plan["skill_id"], rollback_plan["to_version"] ) elif action == "migrate_config": self.migrate_config( rollback_plan["from_version"], rollback_plan["to_version"] ) elif action == "start_skill": self.start_skill(rollback_plan["skill_id"]) elif action == "verify": self.verify_rollback(rollback_plan) def stop_skill(self, skill_id): pass def restore_version(self, skill_id, version): pass def migrate_config(self, from_version, to_version): pass def start_skill(self, skill_id): pass def verify_rollback(self, rollback_plan): pass def restore_backup(self, backup_id): self.backup_repository.restore_backup(backup_id)